Passed
Push — master ( 1b1907...0e6aeb )
by Michael
01:30
created

WebSocketClient.resetIdle   A

Complexity

Conditions 3

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 3
1
import './types'
2
import { Zipnum } from 'zipnum'
3
import { add_event, rm_event, sett } from './utils'
4
import { processConfig } from './config'
5
import { AnyFunc, AnyObject, both, callWith, F, isNil, notf, once, qfilter, T, typeIs } from 'pepka'
6
7
const MAX_32 = 2**31 - 1
8
const { random } = Math
9
const zipnum = new Zipnum()
10
const callit = callWith([])
11
const isNumber = both(typeIs('Number'), notf(isNaN))
12
const ping_send_opts: wsc.SendOptions = {_is_ping: true}
13
14
type EventHandler<T extends keyof WebSocketEventMap> = AnyFunc<any, [WebSocketEventMap[T]]>
15
type EventHandlers = {
16
  open: EventHandler<'open'>[]
17
  close: EventHandler<'close'>[]
18
  error: EventHandler<'error'>[]
19
  message: AnyFunc<any, [WebSocketEventMap['message'] & {data: any}]>[]
20
  timeout: AnyFunc<any, [data: any]>[]
21
}
22
const genid = (q: AnyObject) => {
23
  const id = zipnum.zip((random()*(MAX_32-10))|0)
24
  return id in q ? genid(q) : id
25
}
26
27
class WebSocketClient {
28
  private ws: wsc.Socket|null = null
29
  private intentionally_closed = false
30
  private reconnect_timeout: NodeJS.Timeout|null = null
31
  private queue: Record<string, wsc.Message> = {}
32
  private onReadyQueue: AnyFunc[] = []
33
  private onCloseQueue: AnyFunc[] = []
34
  private handlers: EventHandlers = { open: [], close: [], message: [], error: [], timeout: [] }
35
  private config = <wsc.Config>{}
36
  private ping_timer: NodeJS.Timeout|null = null
37
  private idle_timer: NodeJS.Timeout|null = null
38
  private get opened() { return this.ws?.readyState===1 }  // The only opened state.
39
40
  private init_flush(): void {
41
    // TODO: reject them or save somehow ?..
42
    qfilter(F, this.queue)
43
  }
44
  private call(event_name: wsc.WSEvent, ...args: any[]) {
45
    for(const h of this.handlers[event_name]) h(...args)
46
  }
47
48
  private log(event: string, message: any = null, time: number|null = null): void {
49
    const {config} = this
50
    if(time === null)
51
      if(config.timer) config.log(event, null, message)
52
      else config.log(event, message)
53
    else
54
      config.log(event, time, message)
55
  }
56
57
  private resetPing() {
58
    const {config: {ping}, ping_timer} = this
59
    if(ping) {
60
      if(!isNil(ping_timer))
61
        clearTimeout(ping_timer as NodeJS.Timeout)
62
      this.ping_timer = sett(ping.interval*1e3, async () => {
63
        const {ping_timer, opened} = this
64
        if(opened) {
65
          await this.send(ping.content, ping_send_opts)
66
          this.resetPing()
67
        } else clearTimeout(ping_timer!)
68
      })
69
    }
70
  }
71
72
  private resetIdle() {
73
    const {config: {max_idle_time: time}, idle_timer} = this
74
    if(time!==Infinity) {
75
      if(!isNil(idle_timer)) clearTimeout(idle_timer!)
76
      this.idle_timer = sett(time*1e3, () => this.opened && this.close())
77
    }
78
  }
79
80
  private initSocket(ws: wsc.Socket) {
81
    const {queue, config} = this
82
    this.ws = ws
83
    this.onReadyQueue.forEach((fn: Function) => fn())
84
    this.onReadyQueue.splice(0)
85
    const {id_key, data_key} = config.server
86
    // Works also on previously opened sockets that do not fire 'open' event.
87
    this.call('open', ws)
88
    for(const msg_id in queue) ws.send(queue[msg_id].msg)
89
    if(this.reconnect_timeout !== null) {
90
      clearInterval(this.reconnect_timeout)
91
      this.reconnect_timeout = null
92
    }
93
    this.resetPing(); this.resetIdle()
94
    add_event(ws, 'close', async (...e) => {
95
      this.log('close')
96
      this.ws = null
97
      this.onCloseQueue.forEach(callit)
98
      this.onCloseQueue.splice(0)
99
      this.call('close', ...e)
100
      // Auto reconnect.
101
      let {reconnect, reconnection_attempts} = config
102
      if(isNumber(reconnect)) {
103
        const reconnectFunc = async () => {
104
          if(this.intentionally_closed || !reconnection_attempts) return;
105
          reconnection_attempts--
106
          this.log('reconnect')
107
          if(!isNil(this.ws)) {
108
            this.ws!.close()
109
            this.ws = null
110
          }
111
          // If some error occured, try again.
112
          const status = await this.connect()
113
          if(!isNil(status))
114
            this.reconnect_timeout = setTimeout(reconnectFunc, reconnect*1e3)
115
        }
116
        // TODO: test normal close by server. Would it be infinite ?
117
        reconnectFunc()
118
      }
119
    })
120
    add_event(ws, 'message', (e) => {
121
      try {
122
        const data = config.decode(e.data)
123
        this.call('message', {...e, data})
124
        if(data[id_key]) {
125
          const q = this.queue[data[id_key]]
126
          if(q) {
127
            // Debug, Log.
128
            const time = q.sent_time ? (Date.now() - q.sent_time) : null
129
            this.log('message', data[data_key], time)
130
            // Play.
131
            q.ff(data[data_key])
132
          }
133
        }
134
      } catch (err) {
135
        console.error(err, `WSP: Decode error. Got: ${e.data}`)
136
      }
137
      this.resetPing()
138
    })
139
  }
140
141
  private opening = false
142
  private connect() { // returns status if won't open or null if ok.
143
    return new Promise<null|number>((ff) => {
144
      if(this.opened||this.opening) return ff(null)
145
      this.opening = true
146
      const config = this.config
147
      const ws = config.socket || config.adapter(config.url, config.protocols)
148
      if(!ws || ws.readyState > 1) {
149
        this.opening = false
150
        this.ws = null
151
        this.log('error', 'ready() on closing or closed state! status 2.')
152
        return ff(2)
153
      }
154
      const ffo = once((s: null|number) => {this.opening=false; ff(s)})
155
      add_event(ws, 'error', once((e) => {
156
        this.ws = null
157
        this.log('error', 'status 3. Err: '+e.message)
158
        this.call('error', e)
159
        // Some network error: Connection refused or so.
160
        ffo(3)
161
      }))
162
      // Because 'open' won't be envoked on opened socket.
163
      if(ws.readyState) {
164
        this.initSocket(ws)
165
        ffo(null)
166
      } else {
167
        add_event(ws, 'open', once(() => {
168
          this.log('open')
169
          this.initSocket(ws)
170
          ffo(null)
171
        }))
172
      }
173
    })
174
  }
175
  public get socket() { return this.ws }
176
  public async ready() {
177
    return new Promise<void>((ff) => {
178
      if(this.config.lazy) ff() // FIXME: (possibly) breaking change ?? At least minor ver bump with a notice!!!
179
      else if(this.opened) ff()
180
      else this.onReadyQueue.push(ff)
181
    })
182
  }
183
  public on(
184
    event_name: wsc.WSEvent,
185
    handler: (data: any) => any,
186
    predicate: (data: any) => boolean = T,
187
    raw = false
188
  ) {
189
    const _handler: wsc.EventHandler = (event) =>
190
      predicate(event) && handler(event)
191
    if(raw) add_event(this.ws as wsc.Socket, event_name, _handler)
192
    else this.handlers[event_name].push(_handler)
193
    return _handler
194
  }
195
  public off(
196
    event_name: wsc.WSEvent,
197
    handler: (data: any) => any,
198
    raw = false
199
  ) {
200
    if(raw) return rm_event(this.ws as wsc.Socket, event_name, handler)
201
    const handlers = this.handlers[event_name]
202
    const i = handlers.indexOf(handler)
203
    if(~i) handlers.splice(i, 1)
204
  }
205
206
  public async close(): wsc.AsyncErrCode {
207
    return new Promise((ff, rj) => {
208
      if(this.ws === null) {
209
        rj('WSP: closing a non-inited socket!')
210
      } else {
211
        this.onCloseQueue.push(() => {
212
          this.init_flush()
213
          ff(null)
214
        })
215
        this.ws.close()
216
        this.ws = null
217
        this.intentionally_closed = true
218
      }
219
    })
220
  }
221
222
  public open() {
223
    if(!this.opened) {
224
      this.intentionally_closed = false
225
      return this.connect()
226
    }
227
  }
228
229
  // TODO: Сделать сэттер элементов конфигурации чтобы двигать таймауты.
230
  // И эвент, когда схема наша, а соответствующего элемента очереди не ма.
231
  // Или добавить флажок к эвенту 'message'.
232
  // И событие 'line' со значением on: boolean. Критерии?
233
  private async prepareMessage<RequestDataType = any>(
234
    message_data: RequestDataType,
235
    opts = <wsc.SendOptions>{}
236
  ) {
237
    this.log(opts._is_ping ? 'ping' : 'send', message_data)
238
    const {config, queue} = this
239
    const {pipes, server: {data_key}} = config
240
    const {top, _is_ping} = opts
241
    const id = genid(queue)
242
    if(typeof top === 'object') {
243
      if(top[data_key]) {
244
        throw new Error(`Attempting to set data key/token via ${opts._is_ping ? 'ping' : 'send'}() options!`)
245
      }
246
    }
247
    for(const pipe of pipes) message_data = pipe(message_data)
248
    const [msg, err] = await Promise.all([
249
      config.encode(id, message_data, config),
250
      this.connect()
251
    ])
252
    if(err) throw new Error('ERR while opening connection #'+err)
253
    if(this.opened) {
254
      this.ws!.send(msg)
255
      this.resetPing()
256
      if(!_is_ping) this.resetIdle()
257
    }
258
    const cleanup = () => delete this.queue[id]
259
    const timeout = (rj: AnyFunc) => sett(config.timeout, () => {
260
      if(id in queue) {
261
        this.call('timeout', message_data)
262
        rj({'Websocket timeout expired': config.timeout, 'for the message': message_data})
263
        cleanup()
264
      }
265
    })
266
    return { message_id: id, msg, timeout, cleanup }
267
  }
268
269
  /**  .send(your_data) wraps request to server with {id: `hash`, data: `actually your data`},
270
    returns a Promise that will be rejected after a timeout or
271
    resolved if server returns the same signature: {id: `same_hash`, data: `response data`}.
272
  */
273
  public async send<RequestDataType = any, ResponseDataType = any>(
274
    message_data: RequestDataType,
275
    opts = <wsc.SendOptions>{}
276
  ): Promise<ResponseDataType> {
277
    const {message_id, msg, timeout, cleanup} = await this.prepareMessage(message_data, opts)
278
    const {queue, config} = this
279
280
    return new Promise<ResponseDataType>((ff, rj) => {
281
      const to = timeout(rj)
282
      queue[message_id] = {
283
        msg,
284
        data_type: config.data_type,
285
        sent_time: config.timer ? Date.now() : null,
286
        ff(x: any) {
287
          clearTimeout(to)
288
          ff(x)
289
        }
290
      }
291
    }).finally(cleanup)
292
  }
293
294
  public async *stream<RequestDataType = any, ResponseDataType = any>(
295
    message_data: RequestDataType,
296
    opts = <wsc.SendOptions>{}
297
  ): AsyncGenerator<ResponseDataType, void, unknown> {
298
    const {message_id, msg, timeout, cleanup} = await this.prepareMessage(message_data, opts)
299
    const {queue, config} = this
300
    let done = false, fulfill: AnyFunc, to: NodeJS.Timeout|null = null
301
    queue[message_id] = {
302
      msg,
303
      ff: (msg: ResponseDataType&{done?: boolean}) => {
304
        if(to) {clearTimeout(to); to=null}
305
        if(msg?.done) { cleanup(); done=true }
306
        fulfill(msg)
307
      },
308
      data_type: config.data_type,
309
      sent_time: config.timer ? Date.now() : null
310
    }
311
    while(!done) yield await new Promise<ResponseDataType>((ff, rj) => {
312
      to = timeout(rj), fulfill=ff
313
    })
314
  }
315
316
  // TODO: Add .on handlers to config!
317
  constructor(user_config: wsc.UserConfig = {}) {
318
    this.config = processConfig(user_config)
319
    if(!this.config.lazy) this.connect()
320
  }
321
}
322
323
/* TODO: v3: @.deprecated. Use named import { WebSocketClient } instead. */
324
export default WebSocketClient